<?php

declare(strict_types=1);

namespace NanQi\Hope\Listener;

use Hyperf\Framework\Event\OnPipeMessage;
use NanQi\Hope\Base\BaseListener;
use NanQi\Hope\Crontab\Batch\BatchPipeInterface;
use NanQi\Hope\Crontab\Batch\BatchPipeMessage;

class OnBatchPipeMessageListener extends BaseListener
{
    public function listen(): array
    {
        return [
            OnPipeMessage::class,
        ];
    }

    /**
     * Handle the Event when the event is triggered, all listeners will
     * complete before the event is returned to the EventDispatcher.
     */
    public function process(object $event)
    {
        if ($event instanceof OnPipeMessage && $event->data instanceof BatchPipeMessage) {
            /** @var BatchPipeMessage $eventData */
            $eventData = $event->data;
            try {
                $instance = di($eventData->callable);
                if ($instance instanceof BatchPipeInterface) {
                    $instance->pipe($eventData->data, $event->fromWorkerId);
                }
            } catch (\Exception $e) {
                $this->getLog()->error('OnBatchPipe:' . $e->getMessage()
                    . PHP_EOL . $e->getTraceAsString());
            } catch (\Throwable $throwable) {
                $this->getLog()->error('OnBatchPipe:' . $throwable->getMessage()
                    . PHP_EOL . $throwable->getTraceAsString());
            }
        }
    }

}
